package com.cloudhopper.commons.util.windowing;


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudhopper.commons.util.UnwrappedWeakReference;


/**
 * 
 * 支持滑动窗口协议的类，支持异步发送和异常处理，窗口协议广泛用于高效通讯和低延时的TCP/IP连接上
 * 窗口发送的生命周期有三步：
 * 	  请求获取发送窗口
 * 			如果有空闲的发送Solt，则到第2步
 * 			如果没有空闲的solt，阻塞等待一段时间，在未等待超时有空闲solt，则到第2步
 *    请求获取solt（可设置await（）返回futrue等待完成）
 *    请求被完成（或者成功、失败、或被取消）
 * 如果监控打开，如果窗口长时间不使用，调用freeExternalResources（）
 */
public class Window<K,R,P> {
    private static final Logger logger = LoggerFactory.getLogger(Window.class);

    private final int maxSize;
    private final ConcurrentHashMap<K,DefaultWindowFuture<K,R,P>> futures;
    private final ReentrantLock lock;
    private final Condition completedCondition;
    // 等待发送solt的线程数
    private AtomicInteger pendingOffers;
    private AtomicBoolean pendingOffersAborted;
    //  调度任务(such as expiring requests)
    private final ScheduledExecutorService executor;
    private ScheduledFuture<?> monitorHandle;
    private final WindowMonitor monitor;
    private final long monitorInterval;
    private final CopyOnWriteArrayList<UnwrappedWeakReference<WindowListener<K,R,P>>> listeners;

    /**
     * 构造函数
     * @param size 必须大于1
     */
    public Window(int size) {
        this(size, null, 0, null, null);
    }
    
    /**
     * 制定最大的窗口数量
     * @param size 必须大于0
     * @param executor 调度任务.
     * @param monitorInterval 监控间隔.
     * @param listener A listener to send window events to
     */
    public Window(int size, ScheduledExecutorService executor, long monitorInterval, WindowListener<K,R,P> listener) {
        this(size, executor, monitorInterval, listener, null);
    }
    
    /**
     * 构造函数
     * @param size 必须大于0 
     * @param executor 调度任务.
     * @param monitorInterval 监控间隔.
     * @param listener 事件监听
     * @param monitorThreadName 监听线程名.
     */
    public Window(int size, ScheduledExecutorService executor, long monitorInterval, WindowListener<K,R,P> listener, String monitorThreadName) {
        if (size <= 0) {
            throw new IllegalArgumentException("size must be > 0");
        }
        this.maxSize = size;
        this.futures = new ConcurrentHashMap<K,DefaultWindowFuture<K,R,P>>(size*2);
        this.lock = new ReentrantLock();
        this.completedCondition = this.lock.newCondition();
        this.pendingOffers = new AtomicInteger(0);
        this.pendingOffersAborted = new AtomicBoolean(false);
        this.executor = executor;
        this.monitorInterval = monitorInterval;
        this.listeners = new CopyOnWriteArrayList<UnwrappedWeakReference<WindowListener<K,R,P>>>();
        if (listener != null) {
            this.listeners.add(new UnwrappedWeakReference<WindowListener<K,R,P>>(listener));
        }
        if (this.executor != null) {
            this.monitor = new WindowMonitor(this, monitorThreadName);
            this.monitorHandle = this.executor.scheduleWithFixedDelay(this.monitor, this.monitorInterval, this.monitorInterval, TimeUnit.MILLISECONDS);
        } else {
            this.monitor = null;
            this.monitorHandle = null;
        }
    }

    /**
     * 窗口最大数
     */
    public int getMaxSize() {
        return this.maxSize;
    }

    /**
     * 当前窗口使用数量
     */
    public int getSize() {
        return this.futures.size();
    }
    
    /**
     * 获取空闲窗口数量
     */
    public int getFreeSize() {
        return this.maxSize - this.futures.size();
    }
    
    /**
     * 检查是否已经在使用
     */
    public boolean containsKey(K key) {
        return this.futures.containsKey(key);
    }
    
    /**
     * 通过key获取Futrue
     */
    public WindowFuture<K,R,P> get(K key) {
        return this.futures.get(key);
    }
    
    /**
     * 新增窗口监听器
     */
    public void addListener(WindowListener<K,R,P> listener) {
        this.listeners.addIfAbsent(new UnwrappedWeakReference<WindowListener<K,R,P>>(listener));
    }
    
    /**
     * 移除窗口监听器
     */
    public void removeListener(WindowListener<K,R,P> listener) {
        this.listeners.remove(new UnwrappedWeakReference<WindowListener<K,R,P>>(listener));
    }
    
    /**
     * 获取所有监听
     */
    List<UnwrappedWeakReference<WindowListener<K,R,P>>> getListeners() {
        return this.listeners;
    }
    
    /**
     * 销毁窗口资源
     */
    public synchronized void destroy() {
        try {
            this.abortPendingOffers();
        } catch (Exception e) { }
        this.cancelAll();
        this.listeners.clear();
        this.stopMonitor();
    }
    
    /**
     * 启用监控
     */
    public synchronized boolean startMonitor() {
        if (this.executor != null) {
            if (this.monitorHandle == null) {
                this.monitorHandle = this.executor.scheduleWithFixedDelay(this.monitor, this.monitorInterval, this.monitorInterval, TimeUnit.MILLISECONDS);
            }
            return true;
        }
        return false;
    }
    
    /**
     *停止监控.
     */
    public synchronized void stopMonitor() {
        if (this.monitorHandle != null) {
            this.monitorHandle.cancel(true);
            this.monitorHandle = null;
        }
    }

    /**
     * 保存当前快照
     */
    public Map<K,WindowFuture<K,R,P>> createSortedSnapshot() {
        Map<K,WindowFuture<K,R,P>> sortedRequests = new TreeMap<K,WindowFuture<K,R,P>>();
        sortedRequests.putAll(this.futures);
        return sortedRequests;
    }
    
    /**
     * 获取发送窗口futrue，不等待设置NOT_WATTING
     * @param key 请求的key，一般是协议的序号
     * @param request 请求对象
     * @param offerTimeoutMillis 超时时间.
     * @return A future representing pending completion of the request 
     * @throws DuplicateKeyException 如果消息已经存在抛出异常
     * @throws PendingOfferAbortedException  .
     * @throws OfferTimeoutException 超时异常.
     * @throws InterruptedException .
     */
    public WindowFuture offer(K key, R request, long offerTimeoutMillis) throws DuplicateKeyException, OfferTimeoutException, InterruptedException {
        return this.offer(key, request, offerTimeoutMillis, -1, false);
    }

    /**
     * 指定超时时间获取发送窗口句柄
     */
    public WindowFuture offer(K key, R request, long offerTimeoutMillis, long expireTimeoutMillis) throws DuplicateKeyException, OfferTimeoutException, InterruptedException {
        return this.offer(key, request, offerTimeoutMillis, expireTimeoutMillis, false);
    }

    /**
     * 获取发送窗口句柄，等待指定的时长，如果callerWaitingHint为true，则允许调用
     * await（）
     */
    public WindowFuture offer(K key, R request, long offerTimeoutMillis, long expireTimeoutMillis, boolean callerWaitingHint) throws DuplicateKeyException, OfferTimeoutException, PendingOfferAbortedException, InterruptedException {
        if (offerTimeoutMillis < 0) {
            throw new IllegalArgumentException("offerTimeoutMillis must be >= 0 [actual=" + offerTimeoutMillis + "]");
        }
        
        // does this key already exist?
        if (this.futures.containsKey(key)) {
            throw new DuplicateKeyException("The key [" + key + "] already exists in the window");
        }

        long offerTimestamp = System.currentTimeMillis();
        
        this.lock.lockInterruptibly();
        try {
       
            while (getFreeSize() <= 0) {
                long currentOfferTime = System.currentTimeMillis() - offerTimestamp;
                if (currentOfferTime >= offerTimeoutMillis) {
                    throw new OfferTimeoutException("Unable to accept offer within [" + offerTimeoutMillis + " ms] (window full)");
                }
                
                // check if slow waiting was canceled (terminate early)
                if (this.pendingOffersAborted.get()) {
                    throw new PendingOfferAbortedException("Pending offer aborted (by an explicit call to abortPendingOffers())");
                }
                
                // calculate the amount of timeout remaining
                long remainingOfferTime = offerTimeoutMillis - currentOfferTime;
                try {
                    // 等待发送窗口
                    this.beginPendingOffer();
                    this.completedCondition.await(remainingOfferTime, TimeUnit.MILLISECONDS);
                } finally {
                    boolean abortPendingOffer = this.endPendingOffer();
                    if (abortPendingOffer) {
                        throw new PendingOfferAbortedException("Pending offer aborted (by an explicit call to abortPendingOffers())");
                    }
                }
            }
            
            long acceptTimestamp = System.currentTimeMillis();
            long expireTimestamp = (expireTimeoutMillis > 0 ? (acceptTimestamp + expireTimeoutMillis) : -1);
            int callerStateHint = (callerWaitingHint ? WindowFuture.CALLER_WAITING : WindowFuture.CALLER_NOT_WAITING);
            DefaultWindowFuture<K,R,P> future = new DefaultWindowFuture<K,R,P>(this, lock, completedCondition, key, request, callerStateHint, offerTimeoutMillis, (futures.size() + 1), offerTimestamp, acceptTimestamp, expireTimestamp);
            this.futures.put(key, future);
            return future;
        } finally {
            this.lock.unlock();
        }
    }
    
    /**
     * 提供当前等待的线程数量
     */
    public int getPendingOfferCount() {
        return this.pendingOffers.get();
    }
    
    /*
     * 增加等待数量
     */
    private void beginPendingOffer() {
        this.pendingOffers.incrementAndGet();
    }
    
    /**
     * 结束等待
     * @return True if a pending offer should be aborted. False if a pending
     *      offer can continue waiting if needed.
     */
    private boolean endPendingOffer() {
        int newValue = this.pendingOffers.decrementAndGet();
        // if newValue reaches zero, make sure to always reset "offeringAborted"
        if (newValue == 0) {
            // if slotWaitingCanceled was true, then reset it back to false, and
            // return true to make sure the caller knows to cancel waiting
            return this.pendingOffersAborted.compareAndSet(true, false);
        } else {
            // if slotWaitingCanceled is true, then return true
            return this.pendingOffersAborted.get();
        }
    }
    
    /**
     * 取消所有等待的发送调用者
     */
    public boolean abortPendingOffers() throws InterruptedException {
        this.lock.lockInterruptibly();
        try {
            if (this.pendingOffers.get() > 0) {
                this.pendingOffersAborted.set(true);
                this.completedCondition.signalAll();
                return true;
            } else {
                return false;
            }
        } finally {
            this.lock.unlock();
        }
    }
    
    /**
     * 成功设置futrue关联的响应消息包
     */
    public WindowFuture<K,R,P> complete(K key, P response) throws InterruptedException {
        if (response == null) {
            throw new IllegalArgumentException("Null responses are illegal. Use cancel() instead.");
        }
        
        if (!this.futures.containsKey(key)) {
            return null;
        }

        this.lock.lockInterruptibly();
        try {
            // try to remove future from window
            DefaultWindowFuture<K,R,P> future = this.futures.remove(key);
            if (future == null) {
                return null;
            }
            
            // set success using helper method (bypasses signalAll and requests.remove(key))
            future.completeHelper(response, System.currentTimeMillis());

            // signal that a future is completed
            this.completedCondition.signalAll();

            return future;
        } finally {
            this.lock.unlock();
        }
    }
    
    /**
     * 请求失败的通知
     */
    public WindowFuture<K,R,P> fail(K key, Throwable t) throws InterruptedException {
        if (t == null) {
            throw new IllegalArgumentException("Null throwables are illegal. Use cancel() instead.");
        }
        
        if (!this.futures.containsKey(key)) {
            return null;
        }

        this.lock.lockInterruptibly();
        try {
            // try to remove future from window
            DefaultWindowFuture<K,R,P> future = this.futures.remove(key);
            if (future == null) {
                return null;
            }
            
            // set failed using helper method (bypasses signalAll and requests.remove(key))
            future.failedHelper(t, System.currentTimeMillis());

            // signal that a future is completed
            this.completedCondition.signalAll();

            return future;
        } finally {
            this.lock.unlock();
        }
    }
    
    /**
     *通知所有发送消息失败
     */
    public List<WindowFuture<K,R,P>> failAll(Throwable t) throws InterruptedException {
        if (this.futures.size() <= 0) {
            return null;
        }
        
        List<WindowFuture<K,R,P>> failed = new ArrayList<WindowFuture<K,R,P>>();
        long now = System.currentTimeMillis();
        this.lock.lock();
        try {
            // check every request this window contains and see if it's expired
            for (DefaultWindowFuture<K,R,P> future : this.futures.values()) {
                failed.add(future);
                future.failedHelper(t, now);
            }
            
            if (failed.size() > 0) {
                this.futures.clear();
                // signal that a future is completed
                this.completedCondition.signalAll();
            }
        } finally {
            this.lock.unlock();
        }
        return failed;
    }
    
    /**
     * 取消指定的消息
     */
    public WindowFuture<K,R,P> cancel(K key) throws InterruptedException {
        if (!this.futures.containsKey(key)) {
            return null;
        }

        this.lock.lockInterruptibly();
        try {
            // try to remove future from window
            DefaultWindowFuture<K,R,P> future = this.futures.remove(key);
            if (future == null) {
                return null;
            }
            
            // set failed using helper method (bypasses signalAll and requests.remove(key))
            future.cancelHelper(System.currentTimeMillis());

            // signal that a future is completed
            this.completedCondition.signalAll();

            return future;
        } finally {
            this.lock.unlock();
        }
    }

    /**
     * 取消所有发送的消息
     */
    public List<WindowFuture<K,R,P>> cancelAll() {
        if (this.futures.size() <= 0) {
            return null;
        }
        
        List<WindowFuture<K,R,P>> cancelled = new ArrayList<WindowFuture<K,R,P>>();
        long now = System.currentTimeMillis();
        this.lock.lock();
        try {
            // check every request this window contains and see if it's expired
            for (DefaultWindowFuture<K,R,P> future : this.futures.values()) {
                cancelled.add(future);
                future.cancelHelper(now);
            }
            
            if (cancelled.size() > 0) {
                this.futures.clear();
                // signal that a future is completed
                this.completedCondition.signalAll();
            }
        } finally {
            this.lock.unlock();
        }
        return cancelled;
    }
    
    /**
     * 取消所有超时的消息
     */
    public List<WindowFuture<K,R,P>> cancelAllExpired() {
        if (this.futures.size() <= 0) {
            return null;
        }
        
        List<WindowFuture<K,R,P>> expired = new ArrayList<WindowFuture<K,R,P>>();
        long now = System.currentTimeMillis();
        this.lock.lock();
        try {
            // check every request this window contains and see if it's expired
            for (DefaultWindowFuture<K,R,P> future : this.futures.values()) {
                if (future.hasExpireTimestamp() && now >= future.getExpireTimestamp()) {
                    expired.add(future);
                    future.cancelHelper(now);
                }
            }
            
            if (expired.size() > 0) {
                // take all expired requests and remove them from the pendingRequests
                for (WindowFuture<K,R,P> future : expired) {
                    this.futures.remove(future.getKey());
                }
                // signal that a future is completed
                this.completedCondition.signalAll();
            }
        } finally {
            this.lock.unlock();
        }
        return expired;
    }
    
    void removeHelper(K key) {
        this.futures.remove(key);
    }
}
